rabbitmq

tips:mac安装rabbitmq报错解决

  • # 替换brew.git
    cd "$(brew --repo)"
    git remote set-url origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/brew.git
    
    # 替换homebrew-core.git
    cd "$(brew --repo)/Library/Taps/homebrew/homebrew-core"
    git remote set-url origin https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/homebrew-core.git
    
    # 刷新源
    brew update
  • 如果还报错先检查网络问题,是否翻墙等

  • 再检查依赖是否全部安装

AMQP和JMS

MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

两者间的区别和联系:

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

常见MQ产品

  • ActiveMQ:基于JMS

  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量

基本使用

工作原理

img

组成部分说明:

  • Broker(Server):消息队列服务进程(接受服务端的链接),实现AMQP实体服务

  • Connection:链接,应用程序与Broker的网络连接TCP/IP3次握手 4次挥手

  • Channel:网络信道,所有操作凑是基于信道操作的,信道是连接内的,客户端可以建立多个信道,每一个信道代表一个会话任务

  • Message:消息,服务于应用程序之间传送的数据,由Properties和Body组成,Properties是对消息的修饰,比如消息的优先级,延迟等高级特性,Body是消息体的内容

  • Virtual Host:虚拟地址,类似文件夹,数据库的概念,为了区分管理交换机和队列,同一个虚拟地址内不能出现同名队列和交换机

  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列(根据路由key发送消息绑定的队列),对消息进行过虑(如果一个队列没有指定绑定的交换机的名字,就会绑定一个默认的交换机)。

  • Bindings:交换机和队列的虚拟连接

  • Routing key:是一个路由规则,虚拟机可以通过他确定由什么路由一个消息

  • Queue:消息队列,存储消息的队列,生产者把消息发送给交换机,交换机通过队列转发给消费者

  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送

  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复

写demo的时候注意点

  • 连接时所需的用户需提前创建
  • 连接虚拟机时需给用户对这个虚拟机的权限
  • connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED 当出现这个报错就是这个用户没有对这个虚拟机的权限

java实现

1、安装

brew install rabbitmq

2、启动及关闭RabbitMQ服务

前台启动 

sudo ./rabbitmq-server    或

sudo su
/usr/local/Cellar/rabbitmq/3.7.8/sbin/rabbitmq-server -detacted

后台启动 sudo ./rabbitmq-server -detached

后台关闭 sudo ./rabbitmqctl stop

3 、登录
http://127.0.0.1:15672    guest\guest

4、创建用户与虚拟机并授权
rabbitmqctl add_user USER PASSWORD ##创建用户
rabbitmqctl change_password USER PASSWORD ##修改密码
rabbitmqctl set_user_tags USER administrator  ##设置为管理员
rabbitmqctl add_vhost VHOST ##添加虚拟机
rabbitmqctl set_permissions -p VHOST USER ".*" ".*" ".*"  ##给用户分配虚拟主机权限

创建连接

package com.hz.mq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class rabbitmqUtil {
    /**
     * 建立与RabbitMQ的连接
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("mqv");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        factory.setUsername("hz");
        factory.setPassword("123456");
        // 通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }

}

生产者

package com.hz.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MqProducer {


    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 1、获取到连接
        Connection connection = rabbitmqUtil.getConnection();
        // 2、从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 3、声明(创建)队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4、消息内容
        String message = "Hello World!";
        // 向指定的队列中发送消息
        //参数:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 参数明细:
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
        channel.close();
        connection.close();
    }

}

消费者

package com.hz.mq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class MqConsumer {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = rabbitmqUtil.getConnection();
        //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 声明队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //实现消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            /**
             * 当接收到消息后此方法将被调用
             * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
             * @param envelope 信封,通过envelope
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息体
                String msg = new String(body,"utf-8");
                System.out.println(" [x] received : " + msg + "!");
            }
        };

        // 监听队列,第二个参数:是否自动进行消息确认。
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }


}

ack确认机制

自动ack会在处理消息过程中即使出现异常,也会消息确认

在消息重要的情况下切换手动ack,手动ack后要主动发送ack,否在消息状态为未确认(unacked),在关闭消费者后,状态会回退到ready

消息队列发送后,在消息消费过程中出现异常处理:第一次执行,报错,捕获,重试,第二次执行,报错,捕获记录错误日志到数据库,确认消费

配置direct交换机(直连交换机)

package com.dq.config.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 定义队列名和交换机
 */
@Configuration
public class DirectMqConfig {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";

    /**
     * 绑定key,交换机绑定队列时需要指定
     */
    public static final String BINGDING_KEY_TEST1 = "direct_key1";
    public static final String BINGDING_KEY_TEST2 = "direct_key2";
    public static final String BINGDING_KEY_TEST3 = "direct_key3";

    /**
     * 队列名称
     */
    public static final String QUEUE_TEST1 = "addProductUsderBid";
    public static final String QUEUE_TEST2 = "updateProduct1";
    public static final String QUEUE_TEST3 = "updateStateProduct1";

    /**
     * 构建DirectExchange交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        // 支持持久化,长期不用补删除
        return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
    }

    /**
     * 构建序列
     *
     * @return
     */
    @Bean
    public Queue test1Queue() {
        // 支持持久化
        return new Queue(QUEUE_TEST1, true);
    }

    @Bean
    public Queue test2Queue() {
        // 支持持久化
        return new Queue(QUEUE_TEST2, true);
    }

    @Bean
    public Queue test3Queue() {
        // 支持持久化
        return new Queue(QUEUE_TEST3, true);
    }

    /**
     * 绑定交交换机和
     *
     * @return
     */
    @Bean
    public Binding test1Binding() {
        return BindingBuilder.bind(test1Queue()).to(directExchange()).with(BINGDING_KEY_TEST1);
    }

    @Bean
    public Binding test2Binding() {
        return BindingBuilder.bind(test2Queue()).to(directExchange()).with(BINGDING_KEY_TEST2);
    }

    @Bean
    public Binding test3Binding() {
        return BindingBuilder.bind(test3Queue()).to(directExchange()).with(BINGDING_KEY_TEST3);
    }

    /**
     * 实例化操作模板
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //必须为true,否则无法触发returnedMessage回调,消息丢失
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

}

消息分发与能者多劳

当出现两个消费者时,消费者2处理消息的速度远小于消费者1的情况下,他们会受到同样数量的消息

此时消费者1有大量时间处于空闲状态,可以通过 BasicQos 方法设置prefetchCount = 1

这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。

订阅(fanout)模式

路由匹配(DIrect)模式

模糊模糊匹配(Topic)模式

携带信息匹配(Headers)模式

死信队列

延迟队列